Spark Streaming + Kafka

适用于版本3.0.1。

新的消费者API 提供并行的、Kafka分区与Spark分区1:1、可以访问偏移和元数据的方式。与旧版本直接API在使用方式上不同。

1 连接

1
2
3
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.0.1

注意:不要直接引用kafka依赖。因为spark使用了过渡性依赖,影响了诊断方式的兼容性。

2 创建直接流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

每个记录是一个ConsumerRecord对象。

更多配置详见Kafka consumer config docs

当批处理间隔大于Kafka默认的心跳会话超时(30s)时,需要增加heartbeat.interval.ms 和session.timeout.ms。

批处理间隔超过5min,还需要改变中介上的group.max.session.timeout.ms。

提交策略详见Storing Offsets

3 位置策略

新API会预提取消息到缓冲中。为了提升性能,Spark会在执行器上缓存消费者(spark.streaming.kafka.consumer.cache.enabled),倾向于调度分区到相应的消费者位置。

  • PreferConsistent平均分配分区到执行器上,适合大多数场景;

  • PreferBrokers调度分区到leader所在的节点,适合执行器和broker处于相同节点;

  • PreferFixed允许显式执行分区-主机映射,其余平均分配,适合数据倾斜场景。

默认最大消费者缓存上限为64。如果需要处理超过64 * number of executors个分区,可以设置spark.streaming.kafka.consumer.cache.maxCapacity。

缓存通过主题分区和group.id识别,因此需要为每次调用createDirectStream,使用不同的group.id。

4 消费策略

ConsumerStrategies 允许Spark即使从检查点重启,也可以获取配置的消费者。

  • Subscribe订阅固定的主题集合
  • SubscribePattern使用正则指定主题,需要注意响应新加的主题分区
  • Assign订阅固定的分区集合

以上方式都可以在构造器中指定偏移。

ConsumerStrategy可以自定义扩展。

5 创建RDD

1
2
3
4
5
6
7
8
9
// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

适用于离线处理消息,通过执行主题、分区和偏移。

注意:

不能使用PreferBrokers,因为流场景以外,没有驱动端的消费者自动查找元数据。如果需要,使用PreferFixed和自定义的元数据查找。

6 获取偏移

1
2
3
4
5
6
7
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}

注意:

  • HasOffsetRanges类型转换仅在createDirectStream创建后的第一个方法调用中成功。
  • RDD与Kafka分区一对一关系仅在shuffle或repartition前成立。

7 保存偏移

Kafka消息传递语义取决于偏移的存储方式和时机。Spark输出保证了至少一次语义。

想要实现刚好一次语义,必须实现以下一项:

  • 在幂等输出后保存偏移
  • 输出时在原子性事务中保存偏移

存储偏移主要有以下3种方式:

(1) 检查点

偏移将被保存在检查点中。

但是需要保证输出的幂等性,并且不能在代码变更后恢复检查点。

计划的代码更新可以同时运行新旧代码并保持输出幂等;而计划外的需要识别开始的偏移。

(2) Kafka

Kafka默认在消费者poll()成功后自动提交偏移。

可以使用异步提交commitAsync()在确认输出后提交偏移。

因为Kafka不是事务性,所以输出需要幂等。?

1
2
3
4
5
6
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

注意:由于HasOffsetRanges的原因,CanCommitOffsets类型转换只对createDirectStream的结果有效。

(3) 自定义

实现支持事务的偏移存储。

尤其针对难以幂等的聚合输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val results = yourCalculation(rdd)

// begin your transaction

// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly

// end your transaction
}

8 SSL/TLS

新的Kafka消费者API支持SSL,仅用于Spark与Kafka之间的通信,内部节点间需要另外实现。

1
2
3
4
5
6
7
8
9
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.truststore.password" -> "test1234",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.keystore.password" -> "test1234",
"ssl.key.password" -> "test1234"
)

9 部署

将spark-streaming-kafka-0-10_2.12及其依赖打包

将spark-core_2.12和spark-streaming_2.12标记为provided

详见部署

10 安全

详见Structured Streaming Security

注意:Kafka原生的sink不可用,因此代理令牌只在消费端使用。

参考资料

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)